#include "config.h"

#include <ctype.h>
#include <netdb.h>
#include <errno.h>
#include <uuid/uuid.h>

#define DBG_SUBSYS S_LIBYNET

#include "ynet_net.h"
#include "ynet_rpc.h"
#include "minirpc.h"
#include "net_global.h"
#include "net_msg.h"
#include "configure.h"
#include "main_loop.h"
#include "../sock/sock_udp.h"
#include "dbg.h"

#define WAIT_INTERVAL 1
#define MINIRPC_TIMEOUT 5

typedef struct {
        minirpc_op_t op;
        uuid_t uuid;
        char buf[0];
} minirpc_head_t;

typedef struct {
        minirpc_request_handler handler;
        void *context;
} minirpc_prog_t;

typedef struct {
        int sd;
        struct list_head list;
        sy_rwlock_t lock;
        minirpc_prog_t progs[MINIRPC_OP_END];
} minirpc_t;

static minirpc_t *__minirpc__;

typedef struct {
        struct list_head hook;
        uuid_t uuid;
        time_t time;
} minirpc_id_t;

static int __minirpc_recv(msg_head_t *req, int *reqlen,
                          struct sockaddr *addr, socklen_t *addrlen)
{
        int ret, skip, toread;
        char buf[MAX_BUF_LEN];
        minirpc_head_t *head;
        struct list_head *pos, *n;
        minirpc_id_t *id;
        minirpc_t *minirpc;
        struct list_head *list;

        minirpc = __minirpc__;
        list = &minirpc->list;

        ret = sy_rwlock_wrlock(&__minirpc__->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = ioctl(minirpc->sd, FIONREAD, &toread);
        if (unlikely(ret)) {
                ret = errno;
                GOTO(err_lock, ret);
        }

        if (toread == 0) {
                ret = ENOENT;
                GOTO(err_lock, ret);
        }

        ret = recvfrom(minirpc->sd, req, MAX_BUF_LEN, 0, addr, addrlen);
        if (ret < 0) {
                ret = errno;
                GOTO(err_lock, ret);
        }

        *reqlen = ret;

        DBUG("minirpc got msg\n");

        if (req->len + sizeof(*req) != (LLU)ret) {
                DWARN("========recv a dirty pakage , now throw it away=======\n");
                ret = EAGAIN;
                GOTO(err_lock, ret);
        }

        if (req->crc != crc32_sum(req->buf, req->len)) {
                DWARN("crc fail %x:%x\n", req->crc, crc32_sum(req->buf, req->len));
                ret = EAGAIN;
                GOTO(err_lock, ret);
        }

        head = (void *)req->buf;
        skip = 0;
        list_for_each_safe(pos, n, list) {
                id = (void *)pos;
                if (uuid_compare(id->uuid, head->uuid) == 0) {
                        uuid_unparse(id->uuid, buf);
                        DBUG("skip %s\n", buf);
                        skip = 1;
                        break;
                }
        }

        if (skip) {
                ret = ENOENT;
                GOTO(err_lock, ret);
        }

        sy_rwlock_unlock(&__minirpc__->lock);

        return 0;
err_lock: 
        sy_rwlock_unlock(&__minirpc__->lock);
err_ret:
        return ret;
}

static void __minirpc_reply(const minirpc_head_t *head, msg_head_t *rep,
                           const struct sockaddr *addr, socklen_t addrlen)
{
        int ret;
        char buf[MAX_BUF_LEN];
        minirpc_id_t *id;
        minirpc_t *minirpc;
        struct list_head *list;

        minirpc = __minirpc__;
        list = &minirpc->list;

        ret = sy_rwlock_wrlock(&__minirpc__->lock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        rep->crc = crc32_sum(rep->buf, rep->len);
        uuid_copy(rep->uuid, head->uuid);
        ret = sendto(minirpc->sd, rep, rep->len + sizeof(*rep), 0, addr, addrlen);
        if (ret < 0) {
                sy_rwlock_unlock(&__minirpc__->lock);
                ret = errno;
                DWARN("ret (%u) %s\n", ret, strerror(ret));
                return;
        }

        ret = ymalloc1((void **)&id, sizeof(*id));
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        uuid_copy(id->uuid, head->uuid);
        id->time = gettime();
        list_add_tail(&id->hook, list);

        uuid_unparse(id->uuid, buf);
        DBUG("minirpc got msg %u\n", head->op);

        sy_rwlock_unlock(&__minirpc__->lock);
}

typedef struct {
        struct sockaddr addr;
        socklen_t addrlen;
        char buf[0];
} arg_t;

static void *__minirpc_exec__(void *_arg)
{
        int ret;
        arg_t *arg;
        const minirpc_head_t *head;
        minirpc_prog_t *prog;
        minirpc_t *minirpc;
        const msg_head_t *req;
        char _rep[MAX_BUF_LEN];
        msg_head_t *rep;
        uint32_t _addr;

        arg = _arg;
        req = (void *)arg->buf;
        rep = (void *)_rep;
        minirpc = __minirpc__;

        head = (void *)req->buf;

        _addr = ((struct sockaddr_in *)&arg->addr)->sin_addr.s_addr;
        DBUG("op[%u] from %s\n",  head->op, _inet_ntoa(_addr));

        if (head->op > MINIRPC_OP_NONE && head->op < MINIRPC_OP_END) {
                prog = &minirpc->progs[head->op];

                if (prog->handler == NULL) {
                        DWARN("prog %u not registered, request from %s\n",
                              head->op, _inet_ntoa(_addr));
                        ret = ENOSYS;
                        GOTO(err_ret, ret);
                }

                YASSERT(prog);
                ANALYSIS_BEGIN(0);

                ret = prog->handler(prog->context, rep->buf, &rep->len,
                                    head->buf, req->len - sizeof(*head));
                if (unlikely(ret)) {
                        rep->retval = ret;
                        rep->len = 0;
                } else {
                        rep->retval = 0;
                }

                ANALYSIS_END(0, 1000, NULL);
        } else {
                rep->retval = EINVAL;
        }

        head = (void *)req->buf;
        __minirpc_reply(head, rep, &arg->addr, arg->addrlen);

        yfree((void **)&_arg);
        pthread_exit(NULL);
err_ret:
        yfree((void **)&_arg);
        pthread_exit(NULL);
}

static int __minirpc_exec()
{
        int ret, reqlen = 0;
        struct sockaddr addr;
        char _req[MAX_BUF_LEN];
        socklen_t addrlen;
        msg_head_t *req;
        pthread_t th;
        pthread_attr_t ta;
        arg_t *arg;

        req = (void *)_req;
        addrlen = sizeof(struct sockaddr);
        ret = __minirpc_recv(req, &reqlen, &addr, &addrlen);
        if (unlikely(ret)) {
                if (ret == ENOENT)
                        goto out;
                else
                        GOTO(err_ret, ret);
        }

        ret = ymalloc((void **)&arg, sizeof(*arg) + reqlen);
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if 0
        uint32_t _addr = ((struct sockaddr_in *)&arg->addr)->sin_addr.s_addr;
        const minirpc_head_t *head = (void *)req->buf;
        DWARN("op[%u] from %s\n",  head->op, _inet_ntoa(_addr));
#endif

        arg->addr = addr;
        arg->addrlen = addrlen;
        memcpy(arg->buf, req, reqlen);

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);

        ret = pthread_create(&th, &ta, __minirpc_exec__, arg);
        if (unlikely(ret))
                GOTO(err_free, ret);

out:
        return 0;
err_free:
        yfree((void **)&arg);
err_ret:
        return ret;
}

static void __minirpc_cleanup()
{
        int ret;
        time_t now;
        struct list_head *pos, *n;
        minirpc_id_t *id;
        char buf[MAX_BUF_LEN];

        ret = sy_rwlock_wrlock(&__minirpc__->lock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        now = gettime();
        list_for_each_safe(pos, n, &__minirpc__->list) {
                id = (void *)pos;
                if (now - id->time > 2) {
                        list_del(&id->hook);
                        uuid_unparse(id->uuid, buf);
                        DBUG("remove %s\n", buf);
                        yfree1((void **)&id);
                }
        }

        sy_rwlock_unlock(&__minirpc__->lock);
}

static void * __minirpc_acceptor_worker(void *arg)
{
        int ret;

        (void) arg;

        main_loop_hold();
        
        while(1) {
                DBUG("minirpc wait\n");
                ret = sock_poll_sd(__minirpc__->sd, 1000 * 1000, POLLIN);
                if (unlikely(ret)) {
                        if (ret == ETIME || ret == EINTR) {
                                __minirpc_cleanup();
                                continue;
                        } else
                                GOTO(err_ret, ret);
                }

                ANALYSIS_BEGIN(0);

                ret = __minirpc_exec();
                if (unlikely(ret)) {
                        if (ret == EAGAIN) {
                                continue;
                        } else {
                                GOTO(err_ret, ret);
                        }
                }

                ANALYSIS_END(0, 1000 * 10, NULL);
        }

        return NULL;
err_ret:
        YASSERT(0);
        return NULL;
}

int minirpc_start()
{
        int ret;
        pthread_t th;
        pthread_attr_t ta;
        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);

        while (__minirpc__ == NULL) {
                DWARN("wait minirpc inited\n");
                sleep(1);
        }
        
        ret = pthread_create(&th, &ta, __minirpc_acceptor_worker, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int minirpc_init(const char *hostname, const char *port)
{
        int ret;
        minirpc_t *minirpc;

        ret = ymalloc1((void **)&minirpc, sizeof(*minirpc));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(minirpc, 0x0, sizeof(*minirpc));

        ret = sy_rwlock_init(&minirpc->lock, "minirpc.lock");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        INIT_LIST_HEAD(&minirpc->list);

        DINFO("listen %s:%s\n", hostname, port);

        ret = udp_sock_listen(&minirpc->sd, hostname, port, 0);
        if (unlikely(ret)) {
                DWARN("port %s used, listener only\n", port);
                sleep(1);
                EXIT(EAGAIN);
                //GOTO(err_ret, ret);
        }

        __minirpc__ = minirpc;

#if !ENABLE_START_PARALLEL
        ret = minirpc_start();
        if (unlikely(ret))
                GOTO(err_ret, ret);
#endif

        return 0;
err_ret:
        return ret;
}

void minirpc_request_register(int op, minirpc_request_handler handler, void *context)
{
        minirpc_prog_t *prog;

        prog = &__minirpc__->progs[op];

        prog->handler = handler;
        prog->context = context;
}

static int __connect_all(int *sd, int *count, const char *name)
{
        int ret, herrno = 0, i, retry = 0;
        struct hostent hostbuf, *result;
        char buf[MAX_BUF_LEN], port[MAX_NAME_LEN];
        struct in_addr sin;

retry:
        ret = gethostbyname_r(name, &hostbuf, buf, sizeof(buf),  &result, &herrno);
        if (unlikely(ret)) {
                ret = errno;
                //DWARN("connect name %s\n", name);
                if (ret == EALREADY || ret == EAGAIN || ret == ENETUNREACH) {
                        ret = EAGAIN;
                        USLEEP_RETRY(err_ret, ret, retry, retry, 2, (100 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        if (result == NULL) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        snprintf(port, MAX_NAME_LEN, "%d", gloconf.control_port);
        for (i = 0; result->h_addr_list[i] != NULL; i++) {
                YASSERT(i < *count);

                sin = *(struct in_addr *)(result->h_addr_list[i]);
                DBUG("addr count %s\n", inet_ntoa(sin));
                ret = udp_sock_connect(&sd[i], inet_ntoa(sin), port, YNET_RPC_BLOCK);
                if (unlikely(ret)) {
                        while (i > 0) {
                                close(sd[i - 1]);
                                i--;
                        }

                        GOTO(err_ret, ret);
                }
        }

        *count = i;

        return 0;
err_ret:
        return ret;
}

int minirpc_request_wait(int op, char *rep, int *replen, const char *req, int reqlen,
                         const char *hostname, int timeout)
{
        int ret, sd[YNET_SOCK_MAX], count, i;
        char *buf;
        minirpc_head_t *head;

        buf = mem_cache_calloc(MEM_CACHE_8K, 1);

        if (net_blocked(hostname)) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        if (strlen(hostname) == 0) {
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        ANALYSIS_BEGIN(0);

        YASSERT(reqlen + sizeof(msg_head_t) <= BUF_LEN);

        count = YNET_SOCK_MAX;
        ret = __connect_all(sd, &count, hostname);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        head = (void *)buf;
        memset(head, 0x0, sizeof(*head));

        head->op = op;
        uuid_generate(head->uuid);
        memcpy(head->buf, req, reqlen);

        ret = net_sendmulti(sd, count, buf, reqlen + sizeof(*head));
        if (unlikely(ret))
                GOTO(err_sd, ret);

        ret = net_recvmulti(sd, count, rep, replen, timeout * 1000 * 1000, &head->uuid);
        if (unlikely(ret)) {
                DBUG("hostname %s ret (%u) %s\n", hostname, ret, strerror(ret));
                goto err_sd;
        }

        for (i = 0; i < count; i++) {
                close(sd[i]);
        }

        ANALYSIS_END(0, 1000 * 1000, NULL);

        mem_cache_free(MEM_CACHE_8K, buf);

        return 0;
err_sd:
        for (i = 0; i < count; i++) {
                close(sd[i]);
        }
err_ret:
        mem_cache_free(MEM_CACHE_8K, buf);
        return _errno_net(ret);
}
